1 package org.apache.lucene.replicator;
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import java.io.IOException;
21 import java.util.Collections;
22 import java.util.HashSet;
23 import java.util.List;
24 import java.util.Map;
25 import java.util.Set;
26 import java.util.concurrent.Callable;
27 import java.util.regex.Matcher;
28
29 import org.apache.lucene.index.DirectoryReader;
30 import org.apache.lucene.index.IndexCommit;
31 import org.apache.lucene.index.IndexFileNames;
32 import org.apache.lucene.index.IndexNotFoundException;
33 import org.apache.lucene.index.IndexWriter;
34 import org.apache.lucene.replicator.ReplicationClient.ReplicationHandler;
35 import org.apache.lucene.store.Directory;
36 import org.apache.lucene.store.IOContext;
37 import org.apache.lucene.util.IOUtils;
38 import org.apache.lucene.util.InfoStream;
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59 public class IndexReplicationHandler implements ReplicationHandler {
60
61
62
63
64
65 public static final String INFO_STREAM_COMPONENT = "IndexReplicationHandler";
66
67 private final Directory indexDir;
68 private final Callable<Boolean> callback;
69
70 private volatile Map<String,List<RevisionFile>> currentRevisionFiles;
71 private volatile String currentVersion;
72 private volatile InfoStream infoStream = InfoStream.getDefault();
73
74
75
76
77
78 public static IndexCommit getLastCommit(Directory dir) throws IOException {
79 try {
80 if (DirectoryReader.indexExists(dir)) {
81 List<IndexCommit> commits = DirectoryReader.listCommits(dir);
82
83
84 return commits.get(commits.size() - 1);
85 }
86 } catch (IndexNotFoundException e) {
87
88 }
89 return null;
90 }
91
92
93
94
95
96
97
98
99
100
101
102 public static String getSegmentsFile(List<String> files, boolean allowEmpty) {
103 if (files.isEmpty()) {
104 if (allowEmpty) {
105 return null;
106 } else {
107 throw new IllegalStateException("empty list of files not allowed");
108 }
109 }
110
111 String segmentsFile = files.remove(files.size() - 1);
112 if (!segmentsFile.startsWith(IndexFileNames.SEGMENTS) || segmentsFile.equals(IndexFileNames.OLD_SEGMENTS_GEN)) {
113 throw new IllegalStateException("last file to copy+sync must be segments_N but got " + segmentsFile
114 + "; check your Revision implementation!");
115 }
116 return segmentsFile;
117 }
118
119
120
121
122
123 public static void cleanupFilesOnFailure(Directory dir, List<String> files) {
124 for (String file : files) {
125
126
127 IOUtils.deleteFilesIgnoringExceptions(dir, file);
128 }
129 }
130
131
132
133
134
135
136
137
138
139
140
141 public static void cleanupOldIndexFiles(Directory dir, String segmentsFile, InfoStream infoStream) {
142 try {
143 IndexCommit commit = getLastCommit(dir);
144
145
146
147 if (commit != null && commit.getSegmentsFileName().equals(segmentsFile)) {
148 Set<String> commitFiles = new HashSet<>();
149 commitFiles.addAll(commit.getFileNames());
150 Matcher matcher = IndexFileNames.CODEC_FILE_PATTERN.matcher("");
151 for (String file : dir.listAll()) {
152 if (!commitFiles.contains(file)
153 && (matcher.reset(file).matches() || file.startsWith(IndexFileNames.SEGMENTS))) {
154
155 IOUtils.deleteFilesIgnoringExceptions(dir, file);
156 }
157 }
158 }
159 } catch (Throwable t) {
160
161
162
163 if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
164 infoStream.message(INFO_STREAM_COMPONENT, "cleanupOldIndexFiles(): failed on error " + t.getMessage());
165 }
166 }
167 }
168
169
170
171
172
173 public static void copyFiles(Directory source, Directory target, List<String> files) throws IOException {
174 if (!source.equals(target)) {
175 for (String file : files) {
176 target.copyFrom(source, file, file, IOContext.READONCE);
177 }
178 }
179 }
180
181
182
183
184
185 public IndexReplicationHandler(Directory indexDir, Callable<Boolean> callback) throws IOException {
186 this.callback = callback;
187 this.indexDir = indexDir;
188 currentRevisionFiles = null;
189 currentVersion = null;
190 if (DirectoryReader.indexExists(indexDir)) {
191 final List<IndexCommit> commits = DirectoryReader.listCommits(indexDir);
192 final IndexCommit commit = commits.get(commits.size() - 1);
193 currentRevisionFiles = IndexRevision.revisionFiles(commit);
194 currentVersion = IndexRevision.revisionVersion(commit);
195 final InfoStream infoStream = InfoStream.getDefault();
196 if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
197 infoStream.message(INFO_STREAM_COMPONENT, "constructor(): currentVersion=" + currentVersion
198 + " currentRevisionFiles=" + currentRevisionFiles);
199 infoStream.message(INFO_STREAM_COMPONENT, "constructor(): commit=" + commit);
200 }
201 }
202 }
203
204 @Override
205 public String currentVersion() {
206 return currentVersion;
207 }
208
209 @Override
210 public Map<String,List<RevisionFile>> currentRevisionFiles() {
211 return currentRevisionFiles;
212 }
213
214 @Override
215 public void revisionReady(String version, Map<String,List<RevisionFile>> revisionFiles,
216 Map<String,List<String>> copiedFiles, Map<String,Directory> sourceDirectory) throws IOException {
217 if (revisionFiles.size() > 1) {
218 throw new IllegalArgumentException("this handler handles only a single source; got " + revisionFiles.keySet());
219 }
220
221 Directory clientDir = sourceDirectory.values().iterator().next();
222 List<String> files = copiedFiles.values().iterator().next();
223 String segmentsFile = getSegmentsFile(files, false);
224 String pendingSegmentsFile = "pending_" + segmentsFile;
225
226 boolean success = false;
227 try {
228
229 copyFiles(clientDir, indexDir, files);
230
231
232 indexDir.sync(files);
233
234
235 indexDir.copyFrom(clientDir, segmentsFile, pendingSegmentsFile, IOContext.READONCE);
236 indexDir.sync(Collections.singletonList(pendingSegmentsFile));
237 indexDir.renameFile(pendingSegmentsFile, segmentsFile);
238
239 success = true;
240 } finally {
241 if (!success) {
242 files.add(segmentsFile);
243 files.add(pendingSegmentsFile);
244 cleanupFilesOnFailure(indexDir, files);
245 }
246 }
247
248
249 currentRevisionFiles = revisionFiles;
250 currentVersion = version;
251
252 if (infoStream.isEnabled(INFO_STREAM_COMPONENT)) {
253 infoStream.message(INFO_STREAM_COMPONENT, "revisionReady(): currentVersion=" + currentVersion
254 + " currentRevisionFiles=" + currentRevisionFiles);
255 }
256
257
258
259
260
261
262 cleanupOldIndexFiles(indexDir, segmentsFile, infoStream);
263
264
265
266 if (callback != null) {
267 try {
268 callback.call();
269 } catch (Exception e) {
270 throw new IOException(e);
271 }
272 }
273 }
274
275
276 public void setInfoStream(InfoStream infoStream) {
277 if (infoStream == null) {
278 infoStream = InfoStream.NO_OUTPUT;
279 }
280 this.infoStream = infoStream;
281 }
282
283 }